package com.sccl.autojob.skeleton.mq;

import com.sccl.autojob.util.convert.RegexUtil;
import com.sccl.autojob.util.convert.StringUtils;
import com.sccl.autojob.util.id.IdGenerator;
import com.sccl.autojob.util.id.SystemClock;
import com.sccl.autojob.util.thread.ScheduleTaskUtil;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * @Description 内存消息队列的context
 * @Auther Huang Yongxiang
 * @Date 2022/03/18 17:14
 */
@Slf4j
public class MessageQueueContext<M> implements IMessageQueueContext<M>, IProducer<M>, IConsumer<M> {

    /**
     * 默认过期时间：ms，对所有消息设置，-1则表示消息均为永久性消息，除非消费者取出，否则将会一直存在。谨慎使用！
     */
    private long defaultExpiringTime = -1;

    /**
     * 是否允许设置单个消息的过期时间
     */
    private boolean isAllowSetEntryExpired = false;

    /**
     * 允许的最大主题数
     */
    private int allowMaxTopicCount = 255;

    /**
     * 允许每个队列的最大消息数
     */
    private int allowMaxMessageCountPerQueue;

    /**
     * 过期监听器的策略
     */
    private ExpirationListenerPolicy listenerPolicy;

    //存储消息的数据结构
    private Map<String, MessageQueue<MessageEntry<M>>> messageQueues;

    private boolean isOpenListener = false;

    /**
     * 守护线程
     */
    private ScheduledExecutorService executorService;

    /**
     * 各个主题的订阅数
     */
    private Map<String, AtomicLong> subscriptionCount;

    public static Builder<Object> builder() {
        return new Builder<>();
    }

    private MessageQueueContext() {

    }

    @PostConstruct
    public void init() {

    }

    @Override
    public boolean registerMessageQueue(String topic) {
        if (StringUtils.isEmpty(topic)) {
            log.error("创建队列失败，主题为空");
            return false;
        }
        if (messageQueues.containsKey(topic)) {
            return false;
        }
        if (messageQueues.size() >= allowMaxTopicCount) {
            log.error("当前消息容器最大支持{}个主题", allowMaxTopicCount);
            return false;
        }
        try {
            MessageQueue<MessageEntry<M>> messageQueue = new MessageQueue<>(allowMaxMessageCountPerQueue);
            messageQueues.put(topic, messageQueue);
            if (!isOpenListener) {
                if (listenerPolicy == ExpirationListenerPolicy.SINGLE_THREAD) {
                    scheduleExpiringCheckSingleThread();
                } else {
                    scheduleExpiringCheckTopicConcurrency();
                }
                isOpenListener = true;
            }
            if (subscriptionCount == null) {
                subscriptionCount = new ConcurrentHashMap<>();
            }
            subscriptionCount.put(topic, new AtomicLong(0));
            return true;
        } catch (Exception e) {
            log.error("创建队列发生异常：{}", e.getMessage());
        }
        return false;
    }

    @Override
    public boolean hasTopic(String topic) {
        return messageQueues.containsKey(topic);
    }

    @Override
    public boolean hasRegexTopic(String regexTopic) {
        return messageQueues.keySet().stream().anyMatch(topic -> RegexUtil.isMatch(topic, regexTopic));
    }

    @Override
    public boolean removeMessageQueue(String topic) {
        try {
            messageQueues.remove(topic);
            System.gc();
            return true;
        } catch (Exception e) {
            log.error("移除消息队列时发生异常：{}", e.getMessage());
        }
        return false;
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        if (!isAllowSetEntryExpired) {
            log.error("不允许设置单个消息的过期时间");
            return false;
        }
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败，所要发布到的队列：{}不存在", topic);
            return false;
        }
        if (expiringTime <= 0 || unit == null) {
            log.error("非法过期时间");
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(unit.toMillis(expiringTime) + SystemClock.now());
        return messageQueues.get(topic).publishMessage(messageEntry);
    }

    @Override
    public boolean publishMessageBlock(M message, String topic, long expiringTime, TimeUnit unit) {
        if (!isAllowSetEntryExpired) {
            log.error("不允许设置单个消息的过期时间");
            return false;
        }
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败，所要发布到的队列：{}不存在", topic);
            return false;
        }
        if (expiringTime <= 0 || unit == null) {
            log.error("非法过期时间");
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(unit.toMillis(expiringTime) + SystemClock.now());
        try {
            return messageQueues.get(topic).publishMessageSync(messageEntry);
        } catch (InterruptedException e) {
            log.warn("发布可阻塞消息发生异常，等待时被异常占用：{}", e.getMessage());
        }
        return false;
    }


    /**
     * 阻塞的获取一条消息，可以决定是否将该消息取出，即移出队列，当多播时最好不要移出队列
     *
     * @param topic     主题
     * @param isTakeout 是否移出队列，当为false时该方法将退化成非阻塞的
     * @return M
     * @author Huang Yongxiang
     * @date 2022/3/20 10:55
     */
    @Override
    public M takeMessageBlock(String topic, boolean isTakeout) {
        if (!messageQueues.containsKey(topic)) {
            return null;
        }
        if (isTakeout) {
            try {
                return messageQueues.get(topic).takeMessageSync().message;
            } catch (InterruptedException e) {
                log.warn("可阻塞获取消息发生异常，等待时被异常占用：{}", e.getMessage());
            }
            return null;
        }
        return messageQueues.get(topic).readMessage().message;
    }

    @Override
    public M takeMessageNoBlock(String topic, boolean isTakeout) {
        if (!hasTopic(topic)) {
            return null;
        }
        if (messageQueues.get(topic).length() == 0) {
            return null;
        }
        if (isTakeout) {
            return messageQueues.get(topic).takeMessage().message;
        }
        return messageQueues.get(topic).readMessage().message;
    }

    @Override
    public List<M> takeMessagesBlock(String topic, int count, boolean isTakeout) {
        List<M> messageList = new LinkedList<>();
        for (int i = 0; i < count; i++) {
            messageList.add(takeMessageBlock(topic, isTakeout));
        }
        return messageList;
    }

    @Override
    public List<M> takeMessagesBlock(String topic, int count, boolean isTakeout, long waitTime, TimeUnit unit) {
        if (waitTime < 0) {
            log.error("等待时间非法");
            return Collections.emptyList();
        }
        List<M> ms = new LinkedList<>();
        long mills = unit.toMillis(waitTime);
        int i = 0;
        do {
            M message = takeMessageNoBlock(topic, isTakeout);
            if (message != null) {
                ms.add(message);
                if (ms.size() == count) {
                    break;
                }
            }
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (i++ < mills);
        return ms;
    }

    @Override
    public List<M> takeMessagesNoBlock(String topic, int count, boolean isTakeout) {
        List<M> messageList = new LinkedList<>();
        for (int i = 0; i < count; i++) {
            M message = takeMessageNoBlock(topic, isTakeout);
            if (message != null) {
                messageList.add(message);
            }
        }
        return messageList;
    }

    @Override
    public List<M> takeMessageByRegexTopicBlock(String regexTopic, boolean isTakeout) {
        if (!hasRegexTopic(regexTopic)) {
            return null;
        }
        List<String> topics = messageQueues.keySet().stream().filter(topic -> RegexUtil.isMatch(topic, regexTopic)).collect(Collectors.toList());
        log.debug("获取到正则主题：{}的匹配主题{}个", regexTopic, topics.size());
        if (topics.size() == 0) {
            return null;
        }
        List<M> messages = new LinkedList<>();
        for (String topic : topics) {
            messages.add(takeMessageBlock(topic, isTakeout));
        }
        return messages;
    }

    @Override
    public List<M> takeMessageByRegexTopicNoBlock(String regexTopic, boolean isTakeout) {
        if (!hasRegexTopic(regexTopic)) {
            return null;
        }
        List<String> topics = messageQueues.keySet().stream().filter(topic -> RegexUtil.isMatch(topic, regexTopic)).collect(Collectors.toList());
        log.debug("获取到正则主题：{}的匹配主题{}个", regexTopic, topics.size());
        if (topics.size() == 0) {
            return null;
        }
        List<M> messages = new LinkedList<>();
        for (String topic : topics) {
            M m = takeMessageNoBlock(topic, isTakeout);
            if (m == null) {
                continue;
            }
            messages.add(m);
        }
        return messages;
    }

    @Override
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic) {
        MessageQueue<MessageEntry<M>> messageQueue = messageQueues.get(topic);
        if (messageQueue != null && subscriptionCount != null) {
            AtomicLong atomicLong = subscriptionCount.get(topic);
            atomicLong.incrementAndGet();
        }
        return messageQueue;
    }

    @Override
    @SuppressWarnings("unchecked")
    public MessageQueue<MessageEntry<M>> subscriptionMessage(String topic, long wait, TimeUnit unit) {
        MessageQueue<MessageEntry<M>> messageQueue = subscriptionMessage(topic);
        if (messageQueue != null) {
            return messageQueue;
        }
        //进行阻塞获取
        ScheduledFuture<Object> future = ScheduleTaskUtil.build(false, "subscriptionBlock").EOneTimeTask(() -> {
            long blockTime = unit.toMillis(wait);
            int i = 0;
            try {
                do {
                    if (hasTopic(topic)) {
                        return messageQueues.get(topic);
                    }
                    Thread.sleep(1);
                } while (i++ <= blockTime);
                return null;
            } catch (Exception e) {
                log.error("阻塞订阅时发生异常：{}", e.getMessage());
            }
            return null;
        }, 1, TimeUnit.MILLISECONDS);
        try {
            return (MessageQueue<MessageEntry<M>>) future.get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("阻塞订阅时发生异常：{}", e.getMessage());
        }
        return null;
    }

    @Override
    public void unsubscribe(String topic) {
        unsubscribe(topic, 5, TimeUnit.SECONDS);
    }

    @Override
    public void unsubscribe(String topic, long wait, TimeUnit unit) {
        if (subscriptionCount != null) {
            AtomicLong atomicLong = subscriptionCount.get(topic);
            atomicLong.decrementAndGet();
            if (atomicLong.get() < 0) {
                atomicLong.set(0);
            }
            //当有队列取消订阅，且目前消息数为0，则对指定队列监视5秒，5秒后依然没有生产者发布消息则直接移除主题
            if (atomicLong.get() == 0 && length(topic) == 0) {
                long w = unit.toMillis(wait);
                log.debug("主题为{}的消息队列目前订阅数为0且积压消息为0，当{}ms后若无生产者发布消息将自动删除该主题队列", topic, w);
                ScheduleTaskUtil.build(true, "temporaryProducerListener").EOneTimeTask(() -> {
                    int i = 0;
                    boolean flag = true;
                    do {
                        if (length(topic) > 0) {
                            flag = false;
                            break;
                        }
                        Thread.sleep(1);
                    } while (i++ <= w);
                    if (flag) {
                        log.debug("主题：{}自动删除完成", topic);
                        removeMessageQueue(topic);
                    }
                    return flag;
                }, 1, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override
    public int length(String topic) {
        if (messageQueues.containsKey(topic)) {
            return messageQueues.get(topic).length();
        }
        return 0;
    }

    @Override
    public void expire(String topic, MessageEntry<M> messageEntry) throws ErrorExpiredException {
        if (messageEntry == null || !messageQueues.containsKey(topic)) {
            throw new IllegalArgumentException("参数有误，ID非法或主题不存在");
        }
        try {
            boolean flag = messageQueues.get(topic).remove(messageEntry);
            if (!flag) {
                throw new ErrorExpiredException("移出失败");
            }
        } catch (Exception e) {
            log.error("过期时发生异常");
            throw new ErrorExpiredException(e.getMessage());
        }
    }

    @Override
    public void destroy() {
        messageQueues = null;
        if (isOpenListener) {
            try {
                executorService.shutdown();
                isOpenListener = false;
            } catch (Exception e) {
                log.error("关闭守护线程发生异常：{}", e.getMessage());
            }
        }
        System.gc();
    }

    /**
     * <p>根据迭代器位置来使得一个元素过期，由于迭代器的弱一致性，多线程环境下可能会出现使用迭代器时
     * 发生插入\删除操作，由于该消息队列对于操作严格从队尾执行，因此对于插入修改能检测到，但是由于
     * 删除从队首进行，可能发生当迭代器获取下一个元素时为空，这时应该立即停止遍历，等待下一次</p>
     *
     * @param iterator 迭代器
     * @return void
     * @author Huang Yongxiang
     * @date 2022/3/21 11:42
     */
    public void expire(Iterator<MessageEntry<M>> iterator) throws ErrorExpiredException {
        if (iterator == null) {
            throw new ErrorExpiredException("过期失败，迭代器为空");
        }
        try {
            iterator.remove();
        } catch (UnsupportedOperationException e) {
            throw new ErrorExpiredException("过期失败，该迭代器不支持移除操作");
        } catch (IllegalStateException e) {
            throw new ErrorExpiredException("过期失败，可能发生删除操作");
        }
    }

    @Override
    public boolean publishMessageNoBlock(M message, String topic) {
        if (!messageQueues.containsKey(topic)) {
            log.error("发布非阻塞消息失败，所要发布到的队列主题：{}不存在", topic);
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        MessageEntry<M> messageEntry = new MessageEntry<>();
        messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
        messageEntry.setMessage(message);
        messageEntry.setExpiringTime(defaultExpiringTime > 0 ? defaultExpiringTime + SystemClock.now() : -1);
        return messageQueues.get(topic).publishMessage(messageEntry);
    }

    public boolean publishMessageBlock(M message, String topic) {
        if (!messageQueues.containsKey(topic)) {
            log.error("发布阻塞消息失败，所要发布到的队列主题：{}不存在", topic);
            return false;
        }
        if (message == null) {
            log.error("禁止发布空消息");
            return false;
        }
        try {
            MessageEntry<M> messageEntry = new MessageEntry<>();
            messageEntry.setMessageId(IdGenerator.getNextIdAsLong());
            messageEntry.setMessage(message);
            messageEntry.setExpiringTime(defaultExpiringTime > 0 ? defaultExpiringTime + SystemClock.now() : -1);
            return messageQueues.get(topic).publishMessageSync(messageEntry);
        } catch (InterruptedException e) {
            log.warn("发布可阻塞消息发生异常，等待时被异常占用：{}", e.getMessage());
        }
        return false;
    }

    private void scheduleExpiringCheckSingleThread() {
        executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "ExpiringCheckSingleThread");
            thread.setDaemon(true);
            return thread;
        });
        executorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, MessageQueue<MessageEntry<M>>> entry : messageQueues.entrySet()) {
                for (Iterator<MessageEntry<M>> it = entry.getValue().iterator(); it.hasNext(); ) {
                    MessageEntry<M> message = it.next();
                    //如果达到过期时间则通知其过期
                    if (message.expiringTime >= 0 && message.expiringTime <= SystemClock.now()) {
                        try {
                            log.debug("messageId：{}已过期", message.messageId);
                            expire(it);
                        } catch (ErrorExpiredException e) {
                            log.error("主题：{}，消息ID：{}过期失败：{}", entry.getKey(), message.getMessageId(), e.getMessage());
                        }
                    }
                }
            }
        }, 1, 1, TimeUnit.MILLISECONDS);
    }

    private void scheduleExpiringCheckTopicConcurrency() {
        executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "ExpiringCheckTopicConcurrencyThread");
            thread.setDaemon(true);
            return thread;
        });
        executorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, MessageQueue<MessageEntry<M>>> entry : messageQueues.entrySet()) {
                ScheduledExecutorService queueScheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
                    Thread thread = new Thread(runnable, String.format("%sQueueListener", entry.getKey()));
                    thread.setDaemon(true);
                    return thread;
                });

                queueScheduler.schedule(() -> {
                    synchronized (MessageEntry.class) {
                        for (Iterator<MessageEntry<M>> it = entry.getValue().iterator(); it.hasNext(); ) {
                            MessageEntry<M> message = it.next();
                            //如果达到过期时间则通知其过期
                            if (message.expiringTime >= 0 && message.expiringTime <= SystemClock.now()) {
                                try {
                                    log.debug("messageId：{}已过期", message.messageId);
                                    expire(it);
                                } catch (ErrorExpiredException e) {
                                    log.error("主题：{}，消息ID：{}过期失败：{}", entry.getKey(), message.getMessageId(), e.getMessage());
                                }
                            }
                        }
                    }
                }, 1, TimeUnit.MILLISECONDS);
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }


    @Setter
    @Accessors(chain = true)
    public static class Builder<M> {
        /**
         * 默认过期时间，对所有消息设置，-1则表示消息均为永久性消息，除非消费者取出，否则将会一直存在。谨慎使用！
         */
        private long defaultExpiringTime = -1;

        /**
         * 是否允许设置单个消息的过期时间
         */
        private boolean isAllowSetEntryExpired = false;

        /**
         * 允许的最大主题数
         */
        private int allowMaxTopicCount = 255;

        /**
         * 允许每个队列的最大消息数
         */
        private int allowMaxMessageCountPerQueue = 1000;

        /**
         * 过期监听器的策略
         */
        private ExpirationListenerPolicy listenerPolicy = ExpirationListenerPolicy.SINGLE_THREAD;

        public Builder<M> setDefaultExpiringTime(long defaultExpiringTime, TimeUnit unit) {
            if (unit == TimeUnit.MICROSECONDS) {
                log.error("最小支持毫秒级");
                throw new IllegalArgumentException("最小支持毫秒级");
            }
            this.defaultExpiringTime = unit.toMillis(defaultExpiringTime);
            return this;
        }

        public <M1 extends M> MessageQueueContext<M1> build() {
            MessageQueueContext<M1> messageQueueContext = new MessageQueueContext<>();
            messageQueueContext.messageQueues = new Hashtable<>(this.allowMaxTopicCount);
            messageQueueContext.isAllowSetEntryExpired = this.isAllowSetEntryExpired;
            messageQueueContext.allowMaxMessageCountPerQueue = this.allowMaxMessageCountPerQueue;
            messageQueueContext.defaultExpiringTime = this.defaultExpiringTime;
            messageQueueContext.allowMaxTopicCount = this.allowMaxTopicCount;
            messageQueueContext.listenerPolicy = this.listenerPolicy;
            return messageQueueContext;
        }

    }

    public static class MessageQueue<M> {
        private final BlockingQueue<M> messageQueue;

        public MessageQueue() {
            messageQueue = new LinkedBlockingQueue<>();
        }


        public MessageQueue(int maxLength) {
            if (maxLength <= 0) {
                throw new IllegalArgumentException("最大容量应该为非负数");
            }
            messageQueue = new LinkedBlockingDeque<>(maxLength);
        }

        public M takeMessageSync() throws InterruptedException {
            return messageQueue.take();
        }

        public M takeMessage() {
            return messageQueue.poll();
        }

        public M readMessage() {
            return messageQueue.peek();
        }

        public M tryReadMessage() {
            return messageQueue.element();
        }

        public boolean removeIf(Predicate<? super M> predicate) {
            if (predicate == null) {
                return false;
            }
            return messageQueue.removeIf(predicate);
        }

        public int length() {
            return messageQueue.size();
        }

        public boolean remove(M message) {
            if (message == null) {
                return false;
            }
            return messageQueue.remove(message);
        }

        public Iterator<M> iterator() {
            return messageQueue.iterator();
        }

        private boolean publishMessageSync(M message) throws InterruptedException {
            if (message == null) {
                return false;
            }
            try {
                messageQueue.put(message);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        }

        private boolean publishMessage(M message) {
            if (message == null) {
                return false;
            }
            try {
                return messageQueue.offer(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }

    }

    @Getter
    @Setter(AccessLevel.PRIVATE)
    public static class MessageEntry<M> {
        long messageId;
        M message;
        long expiringTime;

        public MessageEntry(long messageId, M message, long expiringTime) {
            this.messageId = messageId;
            this.message = message;
            this.expiringTime = expiringTime;
        }

        public MessageEntry() {
        }
    }

    public static void main(String[] args) {
        int producerCount = 10;
        int consumerCount = 10;
        long maxMessageCount = 10000000;
        AtomicLong getMessageAccount = new AtomicLong(0);
        AtomicBoolean isFinished = new AtomicBoolean(false);
        int perProducerCreate = (int) (maxMessageCount / producerCount);
        MessageQueueContext<String> messageQueueContext = MessageQueueContext.builder().setAllowMaxTopicCount(producerCount).setListenerPolicy(ExpirationListenerPolicy.SINGLE_THREAD).setAllowMaxMessageCountPerQueue(perProducerCreate).setDefaultExpiringTime(3, TimeUnit.MILLISECONDS).setAllowSetEntryExpired(true).build();
        for (int i = 0; i < producerCount; i++) {
            messageQueueContext.registerMessageQueue(i + "");
        }

        Runnable create = () -> {
            MessageProducer<String> producer = new MessageProducer<>(messageQueueContext);
            String topicName = Thread.currentThread().getName();
            for (int i = 0; i < perProducerCreate; i++) {
                producer.publishMessageNoBlock(StringUtils.getRandomStr(10), topicName);
            }
        };

        Runnable get = () -> {
            MessageConsumer<String> consumer = new MessageConsumer<>(messageQueueContext);
            String topicName = Thread.currentThread().getName();
            while (true) {
                if (getMessageAccount.get() % 10000 == 0) {
                    log.info("已消费消息：{}条", getMessageAccount.get());
                }
                consumer.takeMessageBlock(topicName, true);
                getMessageAccount.incrementAndGet();
                if (getMessageAccount.get() == maxMessageCount) {
                    isFinished.set(true);

                    break;
                }
            }
        };

        long start = System.currentTimeMillis();

        for (int i = 0; i < producerCount; i++) {
            Thread thread = new Thread(create);
            thread.setName("" + i);
            thread.start();
        }

        for (int i = 0; i < consumerCount; i++) {
            Thread thread = new Thread(get);
            thread.setName("" + i);
            thread.start();
        }

        Thread thread = new Thread(() -> {
            while (true) {
                if (isFinished.get()) {
                    break;
                }
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            long end = System.currentTimeMillis();
            log.info("测试完成！消费者数目：{}，生产者数目：{}，消息总数：{}，总计耗时：{}s", consumerCount, producerCount, maxMessageCount, (end - start) / 1000);
        });
        thread.start();


    }


}
